Skip to content

Conversation

@jorisvandenbossche
Copy link
Member

@jorisvandenbossche jorisvandenbossche commented Dec 12, 2019

Follow-up on #5237 adding a higher-level API for datasets

@github-actions
Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on JIRA?
https://issues.apache.org/jira/browse/ARROW

Then could you also rename pull request title in the following format?

ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@kszucs
Copy link
Member

kszucs commented Dec 13, 2019

@jorisvandenbossche it can be rebased now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding this functionality to FileSystemDataSourceDiscovery paths_or_selector argument handling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding this functionality to FileSystemDataSourceDiscovery paths_or_selector argument handling?

Sounds good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how should we express the support for not just one, but multiple data sources.
For example opening a dataset with a source from s3 and a local one.

open_dataset(['s3://bucket/base/path', 'local://base/path'])

We'll probably need more functions, but cannot think a meaningful name right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in this case we'll simply require to construct the data sources and the dataset manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think this advanced use case should not necessarily be handled by such a higher level function (not sure how the dataset discovery in C++ will handle this). So that it is indeed up to the user to construct the data sources manually

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Joris.

@jorisvandenbossche jorisvandenbossche changed the title WIP: [Python] Add higher level open_dataset function ARROW-7432: [Python] Add higher level open_dataset function Dec 18, 2019
@jorisvandenbossche jorisvandenbossche marked this pull request as ready for review December 18, 2019 14:43
@github-actions
Copy link

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kszucs in the other dataset tests you have been using a MockFilesystem, while I used here pytest tempdir fixture, as we do in other tests. Was there a specific reason to use the mock filesystem (or that I should also use it here)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Less IO, but it's fine to use the local filesystem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this is a little different from what R does: https://github.com/apache/arrow/blob/master/r/R/dataset.R#L23-L29

Also, I think this can/should change when #6008 lands. IIUC discovery will automatically identify hive partitions. You may also want to accept a list of strings as the names of the (unnamed, non-hive) partition segments (or perhaps this is to be done on https://issues.apache.org/jira/browse/ARROW-7413).

@jorisvandenbossche jorisvandenbossche force-pushed the dataset-python branch 2 times, most recently from 807c250 to 9b3e646 Compare January 14, 2020 12:17
@jorisvandenbossche
Copy link
Member Author

OK, I updated the PR to add helper functions for the partitioning as well, and to use this in the open_dataset (I am still using the old names for now for consistency in the code, will change that when the renaming lands. And still need to add docstrings if we agree on this pattern).

I created now a single helper function per partitioning scheme that can return both a factory or an actual partitioning (as being discussed in #6151 (comment), and similar as done in R).
I agree with @kszucs that in general it's not super nice API design to return different classes from the same function depending on argument type, but I think in this case it gives a nicer user experience.

So with the current low-level API it looks like (in case of a hive partitioning):

# explicit scheme
ds.open_dataset(sources, partition_scheme=ds.HivePartitionScheme(schema)
# using discovery/factory
ds.open_dataset(sources, partition_scheme=ds.HivePartitionScheme.discover())
# or ds.open_dataset(sources, partition_scheme=ds.HivePartitionSchemeDiscovery())

with what I just pushed, the higher level API becomes

# explicit scheme
ds.open_dataset(sources, partition_scheme=ds.hive_partition_scheme(schema)
# using discovery/factory
ds.open_dataset(sources, partition_scheme=ds.hive_partition_scheme())

with distinct functions for explicit creation vs factory/discovery, it could look like:

# explicit scheme
ds.open_dataset(sources, partition_scheme=ds.hive_partition_scheme(schema)
# using discovery/factory
ds.open_dataset(sources, partition_scheme=ds.hive_partition_scheme_factory())

(or we could also decide to be fine with the class based API of course)

@jorisvandenbossche
Copy link
Member Author

From discussion on zulip, it's also an option to have a single partition_scheme() (or partitioning() after rename) with a flavor keyword, instead of the different functions:

partitioning(schema)  # explicit schema scheme
partitioning(['date', 'client'])  # error: positional argument must be schema
partitioning(field_names=['date', 'client'])  # discover schema scheme

partitioning(schema, flavor='hive') # explicit hive
partitioning(flavor='hive'). # discover everything
partitioning(field_names=['date', 'client'], flavor='hive') # warning: hive partitioning ignores field_names

or without requiring the positional argument to be a Schema (combining it with field_names):

partitioning(['date', 'client'])  # schema-scheme
partitioning(['date', 'client'], flavor='hive')
partitioning(flavor='hive'). # discover everything
partitioning(schema)  # explicit schema scheme
partitioning(schema, flavor='hive') # explicit hive
partitioning(lambda ...)  # function partitioning

@bkietz
Copy link
Member

bkietz commented Jan 14, 2020

Combining field_names with hive flavor is not currently supported and I don't it is useful to add that support. I guess the use case would be to ignore segments from a path like /base_dir/date=3/client=4/not_in_field_names_so_ignored=5/dat_345.parquet; I don't see when that would be necessary. There's negligible performance cost for the extra not_in_field_names_so_ignored == 5 partition expression. The argument might be made that this would allow hive-named directories which don't correspond to partition expressions, but that seems like unnecessary complexity/a footgun to me. Note that hive-like segments below the base dir are already ignored: /correctly_ignored=5/base_dir/a=3/b=4/dat results in a partition expression of "a"_ == 3 and "b"_ == 4

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

I've created a high-level API prototype including support for multiple data sources. @jorisvandenbossche PTAL.

Because the C++ API uses std::variant for partition scheme, we need to return different types form the helper functions (like partitioning(...)).

If we want to unify the open_dataset / dataset function to support both single and multiple sources, we should add FileSystem.from_uri static method.

@jorisvandenbossche
Copy link
Member Author

Thanks @kszucs !
So I think on the unified partitioning() function we agree. I can update that here.

Further, the main differences / discussion points are (focusing on the single source use case):

  • Do we want to support a dataset(path) shortcut for creating a dataset from a single path-source? Or do we require the user to explicitly do dataset(source(path)) (or maybe even dataset([source(path)])) ?
    I think for a majority of use case (which will be single source), it will be less verbose if you can directly create a Dataset with dataset(path). On the other hand, that means that we need to add all the keywords which are only for creating the Source (filesystem, format, partitioning, ..), also to the dataset(..) function.
    I am personally fine with starting with the explicit (more verbose) API, we can later still simplify it and provide the short-cut, if deemed necessary/useful.
  • Naming bike-shedding: dataset(..) vs open_dataset(..) ?

If we want to unify the open_dataset / dataset function to support both single and multiple sources, we should add FileSystem.from_uri static method.

That's something we want to add anyhow, no? (also for easily creating a source without needing to specify the filesystem as an object)

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

@bkietz the partition fields and values are included in the record batches coming from the scan.

I guess those can be ignored if we pass a whitelist to the scanner projection excluding the unnecessary partition fields. Either way, for me it rather seems handy.

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

  • Do we want to support a dataset(path) shortcut for creating a dataset from a single path-source? Or do we require the user to explicitly do dataset(source(path)) (or maybe even dataset([source(path)])) ?
    I think for a majority of use case (which will be single source), it will be less verbose if you can directly create a Dataset with dataset(path). On the other hand, that means that we need to add all the keywords which are only for creating the Source (filesystem, format, partitioning, ..), also to the dataset(..) function.

Agree, and good question :)
Perhaps it also depends on what the users mean under the dataset word.

I am personally fine with starting with the explicit (more verbose) API, we can later still simplify it and provide the short-cut, if deemed necessary/useful.

I'm fine with both. If we can express both cases with a single function signature we can go with that.

  • Naming bike-shedding: dataset(..) vs open_dataset(..) ?

Technically we're not opening, rather describing and discovering a dataset, although the latter could be better because of the pyarrow.dataset namespace. Modifying the Dataset class' signature is also an option.

If we want to unify the open_dataset / dataset function to support both single and multiple sources, we should add FileSystem.from_uri static method.

That's something we want to add anyhow, no? (also for easily creating a source without needing to specify the filesystem as an object)

Definitely, but could help with expressing the single source shortcut.

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

@jorisvandenbossche please incorporate the changes you agree with from my branch

@jorisvandenbossche
Copy link
Member Author

Naming bike-shedding: dataset(..) vs open_dataset(..) ?

Technically we're not opening, rather describing and discovering a dataset, although the latter could be better because of the pyarrow.dataset namespace. Modifying the Dataset class' signature is also an option.

I agree that the "open" is not really needed, but indeed an indentical name for function as the module is what holds me back as well ..

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

Let's defer the naming for later then, use open_dataset for consistency with R for now.

@jorisvandenbossche
Copy link
Member Author

Already added the single partitioning() function (with docs and basic tests). @kszucs what I changed compared to your version is to not return None for the "no partioning"/invalid flavor (the function always returns PartitionScheme or PartitionSchemDiscovery)

@kszucs
Copy link
Member

kszucs commented Jan 14, 2020

@jorisvandenbossche Thanks! Do you want to add support for multiple source or shall we handle it in a follow-up?

@jorisvandenbossche
Copy link
Member Author

I was not planning to do that here (we don't yet have support for that in Python, so can't test this I think?).
But I can already make the source/dataset distinction?

@kszucs
Copy link
Member

kszucs commented Jan 15, 2020

@jorisvandenbossche the source/dataset distinction would be nice. I can add the multi source support later based on my branch as a follow-up.

@jorisvandenbossche
Copy link
Member Author

OK, updated this with separate source() and dataset() functions.

In the end, I went with allowing to use dataset directly (also for a path for single source). It's not difficult to implement, its similar to the API that @nealrichardson sketched in chat, and it just makes it easier to use.

The source(..) function returns a SourceFactory, and not a Source, because in the future (when dealing with multi-source datasets), the dataset(..) function should still be able to inspect the schemas, unify the schemas, and finish the SourceFactories with this unifed schema.

@jorisvandenbossche
Copy link
Member Author

jorisvandenbossche commented Jan 15, 2020

@kszucs please take a look!

(I still need to add some additional tests to cover new aspects of the API)

Copy link
Member

@bkietz bkietz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great, thanks for doing this!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an argument named field_names should accept a schema. Please move it to a separate argument or maybe rename to schema_or_field_names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. @kszucs combined them in his protoype. @kszucs would you be fine with having separate keyword arguments for schema and field_names ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of repeated code in these tests. Please parameterize them or write helpers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify a bit more? What aspect do you find repetitive?

The creation of the data? (but this is different for each test) Or the way that the resulting dataset is checked? That could indeed be written with a helper function, but it might also point to a too verbose API (eg @kszucs was adding a Dataset.to_table method so dataset.new_scan().finish().to_table() could be shortened to dataset.to_table())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a sketch of some helpers and a rewrite of test_open_dataset_directory which should clarify what I mean. I won't block on this; the test suite isn't super long yet

For data creation:

def write_parquet_table(columns, path):
    assert isinstance(path, pathlib.Path)
    path.parent.mkdir(exist_ok=True)

    table = pa.table(columns)

    import pyarrow.parquet as pq
    pq.write_table(table, path)
    return table

For dataset extraction:

def dataset_table(path, **kwargs):
    assert isinstance(path, pathlib.Path)
    dataset = ds.dataset(path, **kwargs)
    table = dataset.to_table()
    # verify equivalently creatable from string path
    assert ds.dataset(str(path)).to_table().equals(table)
    return dataset, table


@pytest.mark.parquet
def test_open_dataset(tempdir):
    columns = {'a': range(9), 'b': [0.] * 4 + [1.] * 5}
    expected = write_parquet_table(columns, tempdir / 'test.parquet')
    dataset, actual = dataset_table(tempdir / 'test.parquet')
    assert actual.replace_schema_metadata().equals(expected)

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkietz thanks for the review!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. @kszucs combined them in his protoype. @kszucs would you be fine with having separate keyword arguments for schema and field_names ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify a bit more? What aspect do you find repetitive?

The creation of the data? (but this is different for each test) Or the way that the resulting dataset is checked? That could indeed be written with a helper function, but it might also point to a too verbose API (eg @kszucs was adding a Dataset.to_table method so dataset.new_scan().finish().to_table() could be shortened to dataset.to_table())

@jorisvandenbossche
Copy link
Member Author

OK, I updated this after the renaming. @kszucs can you have a look?

Copy link
Member

@nealrichardson nealrichardson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm merging this even though there are a couple of pending discussions. I suggest that any further revision can happen in the context of https://issues.apache.org/jira/browse/ARROW-7431 (adding docs), which may lead us to other ergonomic changes anyway. @jorisvandenbossche can you pick that issue up next?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants